package com.atguigu.flink.day06;

import com.atguigu.flink.beans.WaterSensor;
import com.atguigu.flink.func.WaterSensorMapFunction;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author Felix
 * @date 2023/12/6
 * 该案例演示了水位线的传递
 */
public class Flink04_watermark_pass {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator<WaterSensor> mapDS = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());
        mapDS.print(">>>");
        SingleOutputStreamOperator<WaterSensor> wsDS = mapDS
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<WaterSensor>forMonotonousTimestamps()
                    .withTimestampAssigner(
                        new SerializableTimestampAssigner<WaterSensor>() {
                            @Override
                            public long extractTimestamp(WaterSensor ws, long recordTimestamp) {
                                return ws.getTs();
                            }
                        }
                    ).withIdleness(Duration.ofSeconds(50))
            );
        /*SingleOutputStreamOperator<WaterSensor> wsDS = env
            .socketTextStream("hadoop102", 8888)
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<String>forMonotonousTimestamps()
                    .withTimestampAssigner(
                        new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String lineStr, long recordTimestamp) {
                                return Long.valueOf(lineStr.split(",")[1]);
                            }
                        }
                    )

            ).map(new WaterSensorMapFunction());*/


        KeyedStream<WaterSensor, String> keyedDS = wsDS.keyBy(WaterSensor::getId);

        WindowedStream<WaterSensor, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.milliseconds(10)));

        windowDS.process(
            new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
                @Override
                public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                    long count = elements.spliterator().estimateSize();
                    String windowStart = DateFormatUtils.format(context.window().getStart(), "yyyy-MM-dd HH:mm:ss.SSS");
                    String windowEnd = DateFormatUtils.format(context.window().getEnd(), "yyyy-MM-dd HH:mm:ss.SSS");
                    out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                }
            }
        ).print();

        env.execute();
    }
}
